/*
Copyright 2017 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package zk

import (
	"math/rand"
	"sync"
	"time"

	log "github.com/golang/glog"
	zookeeper "github.com/samuel/go-zookeeper/zk"

	"github.com/youtube/vitess/go/stats"
)

var (
	cachedConnStates      = stats.NewStringMap("ZkCachedConn")
	cachedConnStatesMutex sync.Mutex
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

// state is the state of the Zookeeper connection. It's not tracked in the
// connection struct itself, only in the stat variable cachedConnStates.
type state string

const (
	disconnected state = "Disconnected"
	connecting         = "Connecting"
	connected          = "Connected"
)

type cachedConn struct {
	mutex sync.Mutex
	// guarded by mutex
	zconn Conn
}

// ConnCache is a cache for Zookeeper connections which guarantees that you have
// at most one zookeeper connection per cell.
type ConnCache struct {
	mutex        sync.Mutex
	zconnCellMap map[string]*cachedConn // map cell name to connection
}

func (cc *ConnCache) setState(zcell string, state state) {
	cachedConnStatesMutex.Lock()
	defer cachedConnStatesMutex.Unlock()
	cachedConnStates.Set(zcell, string(state))
}

// ConnForPath returns a connection for a given Zookeeper path. If no connection
// is cached, it creates a new one.
func (cc *ConnCache) ConnForPath(zkPath string) (cn Conn, err error) {
	zcell, err := ZkCellFromZkPath(zkPath)
	if err != nil {
		return nil, zookeeper.ErrInvalidPath
	}

	cc.mutex.Lock()
	if cc.zconnCellMap == nil {
		cc.mutex.Unlock()
		return nil, zookeeper.ErrClosing
	}

	conn, ok := cc.zconnCellMap[zcell]
	if !ok {
		conn = &cachedConn{}
		cc.zconnCellMap[zcell] = conn
	}
	cc.mutex.Unlock()

	// We only want one goroutine at a time trying to connect here, so keep the
	// lock during the zk dial process.
	conn.mutex.Lock()
	defer conn.mutex.Unlock()

	if conn.zconn != nil {
		return conn.zconn, nil
	}

	zkAddr, err := ZkPathToZkAddr(zkPath)
	if err != nil {
		return nil, zookeeper.ErrNoNode
	}

	cc.setState(zcell, connecting)
	conn.zconn, err = cc.newZookeeperConn(zkAddr, zcell)
	if conn.zconn != nil {
		cc.setState(zcell, connected)
	} else {
		cc.setState(zcell, disconnected)
	}
	return conn.zconn, err
}

func (cc *ConnCache) newZookeeperConn(zkAddr, zcell string) (Conn, error) {
	conn, session, err := DialZkTimeout(zkAddr, *baseTimeout, *connectTimeout)
	if err != nil {
		return nil, err
	}
	go cc.handleSessionEvents(zcell, conn, session)
	return conn, nil
}

func (cc *ConnCache) handleSessionEvents(cell string, conn Conn, session <-chan zookeeper.Event) {
	closeRequired := false
	for event := range session {
		switch event.State {
		case zookeeper.StateExpired, zookeeper.StateConnecting:
			closeRequired = true
			fallthrough
		case zookeeper.StateDisconnected:
			var cached *cachedConn
			cc.mutex.Lock()
			if cc.zconnCellMap != nil {
				cached = cc.zconnCellMap[cell]
			}
			cc.mutex.Unlock()

			// keek the entry in the map, but nil the Conn
			// (that will trigger a re-dial next time
			// we ask for a variable)
			if cached != nil {
				cached.mutex.Lock()
				if closeRequired {
					cached.zconn.Close()
				}
				cached.zconn = nil
				cc.setState(cell, disconnected)
				cached.mutex.Unlock()
			}

			log.Infof("zk conn cache: session for cell %v ended: %v", cell, event)
			return
		default:
			log.Infof("zk conn cache: session for cell %v event: %v", cell, event)
		}
	}
}

// Close closes all cached connections.
func (cc *ConnCache) Close() error {
	cc.mutex.Lock()
	defer cc.mutex.Unlock()
	for _, conn := range cc.zconnCellMap {
		conn.mutex.Lock()
		if conn.zconn != nil {
			conn.zconn.Close()
			conn.zconn = nil
		}
		conn.mutex.Unlock()
	}
	cc.zconnCellMap = nil
	return nil
}

// NewConnCache creates a new Zookeeper connection cache.
func NewConnCache() *ConnCache {
	return &ConnCache{
		zconnCellMap: make(map[string]*cachedConn),
	}
}
